package org.budo.warehouse.logic.util;

import java.util.ArrayList;
import java.util.List;

import org.budo.support.lang.util.StringUtil;
import org.budo.warehouse.logic.api.DataEntry;
import org.budo.warehouse.logic.api.DataEntryPojo;
import org.budo.warehouse.logic.api.DataMessage;
import org.budo.warehouse.logic.api.DataMessagePojo;
import org.budo.warehouse.service.entity.Pipeline;

import lombok.extern.slf4j.Slf4j;

/**
 * @author limingwei
 */
@Slf4j
public class DataMessageLogicUtil {
    public static DataMessagePojo toMessagePojo(DataMessage dataMessage, Pipeline pipeline) {
        List<DataEntry> dataEntries = dataMessage.getDataEntries();
        List<DataEntry> dataEntryPojos = new ArrayList<DataEntry>();

        for (DataEntry dataEntry : dataEntries) {
            String targetSchema = PipelineUtil.targetSchema(pipeline, dataEntry);
            String targetTable = PipelineUtil.targetTable(pipeline, dataEntry);

            DataEntryPojo dataEntryPojo = new DataEntryPojo(dataEntry) // 序列化传输
                    .setSchemaName(targetSchema) // 供下一步匹配
                    .setTableName(targetTable);

            dataEntryPojos.add(dataEntryPojo);
        }

        Integer dataNodeId = pipeline.getTargetDataNodeId();
        return new DataMessagePojo(dataMessage.getId(), dataNodeId, dataEntryPojos);
    }

    public static DataMessage toMessageBuffer(DataMessage dataMessage) {
        List<DataEntry> dataEntries = dataMessage.getDataEntries();
        List<DataEntry> dataEntryPojos = new ArrayList<DataEntry>();
        for (DataEntry dataEntry : dataEntries) {
            String tableName = dataEntry.getTableName();
            String eventType = dataEntry.getEventType();

            if (StringUtil.equals(tableName, "t_entry_buffer") && StringUtil.equals(eventType, "INSERT")) {
                log.debug("#230 dataEntry=" + dataEntry); // 避免造成递归
                continue;
            }

            dataEntryPojos.add(new DataEntryPojo(dataEntry));
        }

        Integer dataNodeId = dataMessage.getDataNodeId();
        return new DataMessagePojo(dataMessage.getId(), dataNodeId, dataEntryPojos);
    }
}